fluent-plugin-kinesisがAmazon Kinesis FirehoseとKPLに対応したので試してみた
ども、大瀧です。
OSSのデータコレクタFluentdのKinesisプラグインであるfluent-plugin-kinesisのバージョン1.0リリースされ、従来からサポートするAmazon Kinesis Streamに加えて、Amazon Kinesis FirehoseとKPL(Kinesis Producer Library)に対応しました!ためしてみた様子をレポートします!
セットアップ
動作確認環境
- OS : Ubuntu Server 14.04 Trusty
- Flunetd : td-agent 2.3.1
RubyGemsにデプロイ済みなので、td-agentに同梱されるtd-agent-gem
で簡単にインストールできます。td-agentとセットでインストールしました。
$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 563 100 563 0 0 1882 0 --:--:-- --:--:-- --:--:-- 1882 This script requires superuser access to install apt packages. :(略) Starting td-agent: * td-agent Processing triggers for libc-bin (2.19-0ubuntu6.6) ... $ sudo td-agent-gem install fluent-plugin-kinesis Fetching: concurrent-ruby-1.0.1.gem (100%) Successfully installed concurrent-ruby-1.0.1 Fetching: os-0.9.6.gem (100%) Successfully installed os-0.9.6 Fetching: middleware-0.1.0.gem (100%) Successfully installed middleware-0.1.0 Fetching: protobuf-3.6.7.gem (100%) Successfully installed protobuf-3.6.7 Fetching: fluent-plugin-kinesis-1.0.0.gem (100%) Successfully installed fluent-plugin-kinesis-1.0.0 Parsing documentation for concurrent-ruby-1.0.1 Installing ri documentation for concurrent-ruby-1.0.1 Parsing documentation for os-0.9.6 Installing ri documentation for os-0.9.6 Parsing documentation for middleware-0.1.0 Installing ri documentation for middleware-0.1.0 Parsing documentation for protobuf-3.6.7 Installing ri documentation for protobuf-3.6.7 Parsing documentation for fluent-plugin-kinesis-1.0.0 Installing ri documentation for fluent-plugin-kinesis-1.0.0 Done installing documentation for concurrent-ruby, os, middleware, protobuf, fluent-plugin-kinesis after 6 seconds 5 gems installed $
以前のバージョン(`0.x`)から、Kinesis Streamを利用するタイプ名が変更(`kinesis`→`kinesis_streams`)されています。アップグレードする場合は注意しましょう。
試しに、Apacheログ(/var/log/apache2/access.log
)をKinesis Firehoseのデリバリーストリームtakipone-test
に送信するように設定してみました。takipone-test
ではS3のバケットtakipone-firehosetest
に転送するよう、あらかじめ設定しています。いずれもオレゴン(us-west-2
)リージョンに作成しました。
<source> @type tail path /var/log/apache2/access.log pos_file /var/log/td-agent/httpd-access.log.pos tag apache.access format apache2 </source> <match **> @type kinesis_firehose region us-west-2 delivery_stream_name takipone-test </match> flush_interval 1 buffer_chunk_limit 1m try_flush_interval 0.1 queued_chunk_flush_interval 0.01 num_threads 15 detach_process 5
122〜127行目は、GitHubのREADMEにある推奨パラメータです。調節しつつ使いましょう。
併せて、いずれかの方法でAWSの認証情報(APIキーなど)を設定します。今回は、EC2で動作させるのでIAMロールにKinesisにアクセスする権限を与えて設定しました。GitHubのREADMEが参考になります。
では、td-agentを再起動して設定ファイルを読み込み、ab
でログファイルに書き込んでみます。
$ sudo service td-agent restart Restarting td-agent: * td-agent $ ab -n 10000 http://localhost/ This is ApacheBench, Version 2.3 <$Revision: 1528965 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking localhost (be patient) Completed 1000 requests Completed 2000 requests Completed 3000 requests Completed 4000 requests Completed 5000 requests Completed 6000 requests Completed 7000 requests Completed 8000 requests Completed 9000 requests Completed 10000 requests Finished 10000 requests : (略) $
しばらく待つと、S3のバケットにファイルが作成されました!
中身は、ApacheのログデータがJSON形式で格納されています。
{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"} {"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"} {"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"} :(略)
KPL(Kinesis Producer Library)も試してみます。以下のようにtd-agent.conf
を変更しました。併せて、Kinesis Streamのストリームtakipone-test
を作成して準備します。
<match **> @type kinesis_producer region us-west-2 stream_name takipone-test </match>
これでOKです。先ほどと同じく、ab
コマンドを何度か実行した後、ストリームからレコードを取り出してみます。空のレコードが数レコード続いてから、Fluentdから送ったレコードが出てきます。
$ aws kinesis get-shard-iterator --region us-west-2 --output json --stream-name takipone-test --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON { "ShardIterator": "AAAAAAAAAAEsTU/Sld4XveRKzirhuga3xrC7K2BJZ5lT72OvvkJq5qvaYjkJvE7NOT4btXWTaM1D7cTjFEkavThzYTa4S2c2uEbZHPUCWx/Im5bw9bcP3oNPeeY5B9Y9MPGPH+P42Gptz65BupY4RlkDQ+HCmhSIhiaUTcxQGajfIvo9x2meIJhHGOMwkegEkl5W5RBavkP9yC2DCqnOYbPo4TuH2tbr" } $ aws kinesis get-records --region us-west-2 --output json --shard-iterator AAAAAAAAAAEsTU/Sld4XveRKzirhuga3xrC7K2BJZ5lT72OvvkJq5qvaYjkJvE7NOT4btXWTaM1D7cTjFEkavThzYTa4S2c2uEbZHPUCWx/Im5bw9bcP3oNPeeY5B9Y9MPGPH+P42Gptz65BupY4RlkDQ+HCmhSIhiaUTcxQGajfIvo9x2meIJhHGOMwkegEkl5W5RBavkP9yC2DCqnOYbPo4TuH2tbr { "Records": [], "NextShardIterator": "AAAAAAAAAAEQopt7XsozsDxyBVPuDkZCZtv3vPiGip7fG1Yy9ZruraEgS+zLjfGupIqHU+xv7nsKoCZ4GfUfmpRs6IoaQy+FkDfxziLXqYkJ+ZA2MTQXYjQZmlXEpZAwQm/pRwe5jCMUKAEsyxu24gFbith3p8AnjaRDOWIF+Z2wagu6ENFp084qJAkr7+R/3NZ16i8wpNJbvA4G6HkJDhcxxmUZXiax", "MillisBehindLatest": 62926000 } $ aws kinesis get-records --region us-west-2 --output json --shard-iterator AAAAAAAAAAFlrcyI/mH5mU6nVlojqU6vqI/hFFJnoTDH1qMfP6Os5/iV3Mna+bWrWEgOe00djhGXr0cwDdlRDdg5dDsR/Mmjj/cVsAC2aGB6+JV11TMoQE0qvdGcZ+dfkIGKH3epEW5ZXgDjIb8YVXYKm9R/vHraz9jCHSly0WSUPs0pxvIVD6YjLMwS5xgx2AmmCTRF8IQPUAng1vaCNpmt6ItjGNbO { "Records": [], "NextShardIterator": "AAAAAAAAAAFlcnzQhBsDNI27oBJiHorl74FEBAw+5rkjD2LoMgHc/Nwqhz0NNpobQJwMGdBkST4HM2rLGLKOlZ+qMRzn9soWpwFpfEbyItCvm6eA9od8f5mXIrqBivfdxoeg8R4pxfAvIlo2IySo2LnaocizN6ECr62O2j4VxwTHh6Fy1KlaSKyrZexUv0KfmrrxxBk82JyKu+DM+++Mgvnn4cBwwKJ3", "MillisBehindLatest": 38149000 } $ aws kinesis get-records --region us-west-2 --output json --shard-iterator AAAAAAAAAAGG2Kp28GH2eyxoBR7xYwXyOIf4ouPikGOrk1BCBxABK+1a5rxqGPYYY3/5exN7ql8+ODFaDFPBg589+Glenx20N1jE8fxyKIzjKWyIkV7apM7u/8yAdI1UrFGo4kD5MawqpFbf8IxgAwySce1qAe9zkpw3Rp4KbzGyFRa+/Jn3cjvjnJSLT+5eGtYDnJdq11KAXT7amGSip0VBjhcgtPk/ { "Records": [ { "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjoyMDAsInNpemUiOjExNzY0LCJyZWZlcmVyIjpudWxsLCJhZ2VudCI6ImN1cmwvNy4zNS4wIn0=", "PartitionKey": "a11392987a071f957ab3505bfc25cace", "ApproximateArrivalTimestamp": 1458397403.342, "SequenceNumber": "49559948046087405716686625491085803253240564075691245570" }, { "Data": "84mawgogNDJmZTg3OWMzOGQ2YmQ5YjM4ZTAyODk5Y2IwZDBhOTAKIDRlMzQ2NDFjYmEwZGFkOGZlZGFiMGNiN2Y3NzVlMjRiCiBmZDg0NGRkZTUyNWI3OTVlYjA3NGIyNmY4ZTlhMzRhOBp/CAAae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAEae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAIae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifWI6nYO/8w3iHbNp9OPICuY=", "PartitionKey": "a", "ApproximateArrivalTimestamp": 1458397523.517, "SequenceNumber": "49559948046087405716686625493830064863765780548611473410" }, :(略) ], "NextShardIterator": "AAAAAAAAAAGwSuYZZGtgEmrq/oW3ln+uK47Crz2bX569pLpjCQkTc27Et8rTKjT3s8QmGXIon4iXlBP12Jx3e3uFtAmnUpTYoiIoGXLBngxFyQc/d8ZYQ54PE+AF/whhBKoy5FNE6uENHeP7uBMqjCYq2G3hH55ZT2TFjVOhKC9Q0nRiN5+Gn+5YCCZIIvM/2a7hF5Ec/pOxJ6anG1EqDZT9mml+Yj9O", "MillisBehindLatest": 6215000 }
KPLは、Protocol Buffers形式でレコードを集約するライブラリです。フォーマットは KPLのGitHubが参考になります。
Kinesis StreamsのレコードはBASE64エンコードされているので、デコードしてみると様子が確認できます。
$ echo '84mawgogNDJmZTg3OWMzOGQ2YmQ5YjM4ZTAyODk5Y2IwZDBhOTAKIDRlMzQ2NDFjYmEwZGFkOGZlZGFiMGNiN2Y3NzVlMjRiCiBmZDg0NGRkZTUyNWI3OTVlYjA3NGIyNmY4ZTlhMzRhOBp/CAAae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAEae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAIae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifWI6nYO/8w3iHbNp9OPICuY=' | base64 -D �� 42fe879c38d6bd9b38e02899cb0d0a90 4e34641cba0dad8fedab0cb7f775e24b fd844dde525b795eb074b26f8e9a34a8{{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"}{{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"{{��i���:"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"}b:���� �$
複数のレコードが入っていますね! KPLで集約されたレコードは、同じくKPLで解除する必要があります。最近、Kinesis StreamからPullするKPL対応のLambdaのサンプルコードが公開されたので、手軽に使うのであればこれを利用するのが良いでしょう。
まとめ
FirehoseとKPLに対応し、大幅にパワーアップしたfluent-kinesis-plugin
をご紹介しました。歴史のあるプラグインで、かつ日本人エンジニアがコミッタ *1のプラグインなので引き続き応援していきたいです!
個人的には、これまで有用とわかっていながらJavaしか実装が無く手が出せなかったKPLが、Lambdaのコードとセットで一気に身近になった印象です。KPLもバシバシ検討していきましょう!
脚注
- imaifactoryさんからriywoさんに引き継がれたようです ↩